{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "d1de0f1a",
   "metadata": {},
   "source": [
    "<a href=\"https://colab.research.google.com/github/run-llama/llama_index/blob/main/docs/examples/ingestion/parallel_execution_ingestion_pipeline.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "c8cbe152-de29-4240-8e13-f74dc146a658",
   "metadata": {},
   "source": [
    "# Parallelizing Ingestion Pipeline"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "17fd7dec-c846-4ae1-98ab-5436fac08668",
   "metadata": {},
   "source": [
    "In this notebook, we demonstrate how to execute ingestion pipelines using parallel processes. Both sync and async versions of batched parallel execution are possible with `IngestionPipeline`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "10db236d",
   "metadata": {},
   "outputs": [],
   "source": [
    "%pip install llama-index-embeddings-openai"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "e0963707-6ebe-4441-a363-1bfb48ce9df3",
   "metadata": {},
   "outputs": [],
   "source": [
    "import nest_asyncio\n",
    "\n",
    "nest_asyncio.apply()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "bfe4bcbf-491d-4d55-bade-a40d5e8b32fb",
   "metadata": {},
   "outputs": [],
   "source": [
    "import cProfile, pstats\n",
    "from pstats import SortKey"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "2fba575e-2635-4598-a74a-d4036c1816db",
   "metadata": {},
   "source": [
    "### Load data"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "92686bb0-85ed-4bb3-99eb-f5fc6c100787",
   "metadata": {},
   "source": [
    "For this notebook, we'll load the `PatronusAIFinanceBenchDataset` llama-dataset from [llamahub](https://llamahub.ai)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "a3b94d62-efa4-479a-9215-e094b5a73061",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Successfully downloaded PatronusAIFinanceBenchDataset to ./data\n"
     ]
    }
   ],
   "source": [
    "!llamaindex-cli download-llamadataset PatronusAIFinanceBenchDataset --download-dir ./data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "f49f7e5b-6430-426b-b239-e9280ea7b229",
   "metadata": {},
   "outputs": [],
   "source": [
    "from llama_index.core import SimpleDirectoryReader\n",
    "\n",
    "documents = SimpleDirectoryReader(input_dir=\"./data/source_files\").load_data()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "5b00be91-22ea-403c-b9c4-cd030b7e6c09",
   "metadata": {},
   "source": [
    "### Define our IngestionPipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "1089adee-bc8a-457f-8d96-113435923d10",
   "metadata": {},
   "outputs": [],
   "source": [
    "from llama_index.core import Document\n",
    "from llama_index.embeddings.openai import OpenAIEmbedding\n",
    "from llama_index.core.node_parser import SentenceSplitter\n",
    "from llama_index.core.extractors import TitleExtractor\n",
    "from llama_index.core.ingestion import IngestionPipeline\n",
    "\n",
    "# create the pipeline with transformations\n",
    "pipeline = IngestionPipeline(\n",
    "    transformations=[\n",
    "        SentenceSplitter(chunk_size=1024, chunk_overlap=20),\n",
    "        TitleExtractor(),\n",
    "        OpenAIEmbedding(),\n",
    "    ]\n",
    ")\n",
    "\n",
    "# since we'll be testing performance, using timeit and cProfile\n",
    "# we're going to disable cache\n",
    "pipeline.disable_cache = True"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "1937fbaa-0cef-494d-b3e1-a5ff268fd8d2",
   "metadata": {},
   "source": [
    "### Parallel Execution"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "cf20d688-5994-4cd7-8f52-079b686328fb",
   "metadata": {},
   "source": [
    "A single run. Setting `num_workers` to a value greater than 1 will invoke parallel execution."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b5d68e6a-a658-46e8-9b71-d857b1c90d50",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "100%|██████████| 5/5 [00:01<00:00,  3.74it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  4.25it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  4.54it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  3.27it/s]\n",
      "100%|██████████| 2/2 [00:00<00:00,  2.43it/s]\n"
     ]
    }
   ],
   "source": [
    "nodes = pipeline.run(documents=documents, num_workers=4)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "2627d411-8fad-43ca-a6f0-533635e0c613",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "5297"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "len(nodes)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "8bffaefb-f710-4187-a19f-11d10ebae82b",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "100%|██████████| 5/5 [00:01<00:00,  3.65it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  4.05it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  4.14it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  2.83it/s]\n",
      "100%|██████████| 2/2 [00:00<00:00,  2.07it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  3.64it/s]\n",
      "100%|██████████| 5/5 [00:00<00:00,  5.26it/s]\n",
      "100%|██████████| 5/5 [00:00<00:00,  6.17it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  3.80it/s]\n",
      "100%|██████████| 2/2 [00:00<00:00,  2.12it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  3.56it/s]\n",
      "100%|██████████| 5/5 [00:00<00:00,  6.90it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  2.86it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  3.15it/s]\n",
      "100%|██████████| 2/2 [00:00<00:00,  2.10it/s]\n",
      "100%|██████████| 5/5 [00:00<00:00,  5.10it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  3.17it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  4.75it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  3.46it/s]\n",
      "100%|██████████| 2/2 [00:00<00:00,  2.03it/s]\n",
      "100%|██████████| 5/5 [00:00<00:00,  7.27it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  3.25it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  4.09it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  3.93it/s]\n",
      "100%|██████████| 2/2 [00:00<00:00,  2.51it/s]\n",
      "100%|██████████| 5/5 [00:00<00:00,  6.94it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  3.34it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  4.66it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  3.84it/s]\n",
      "100%|██████████| 2/2 [00:00<00:00,  2.64it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  3.69it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  4.43it/s]\n",
      "100%|██████████| 5/5 [00:00<00:00,  5.24it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  4.01it/s]\n",
      "100%|██████████| 2/2 [00:00<00:00,  2.52it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  4.82it/s]\n",
      "100%|██████████| 5/5 [00:00<00:00,  5.39it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  4.57it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  3.55it/s]\n",
      "100%|██████████| 2/2 [00:00<00:00,  2.58it/s]\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "29 s ± 1.56 s per loop (mean ± std. dev. of 7 runs, 1 loop each)\n"
     ]
    }
   ],
   "source": [
    "%timeit pipeline.run(documents=documents, num_workers=4)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "a4d514fe-313b-4b88-8122-c5a44db210df",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "100%|██████████| 5/5 [00:01<00:00,  4.26it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  3.44it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  4.14it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  3.31it/s]\n",
      "100%|██████████| 2/2 [00:00<00:00,  2.72it/s]\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Tue Jan  9 14:59:20 2024    newstats\n",
      "\n",
      "         2048 function calls in 29.897 seconds\n",
      "\n",
      "   Ordered by: cumulative time\n",
      "   List reduced from 214 to 15 due to restriction <15>\n",
      "\n",
      "   ncalls  tottime  percall  cumtime  percall filename:lineno(function)\n",
      "        1    0.000    0.000   29.897   29.897 {built-in method builtins.exec}\n",
      "        1    0.057    0.057   29.896   29.896 <string>:1(<module>)\n",
      "        1    0.000    0.000   29.840   29.840 pipeline.py:378(run)\n",
      "       12    0.000    0.000   29.784    2.482 threading.py:589(wait)\n",
      "       12    0.000    0.000   29.784    2.482 threading.py:288(wait)\n",
      "       75   29.784    0.397   29.784    0.397 {method 'acquire' of '_thread.lock' objects}\n",
      "        1    0.000    0.000   29.783   29.783 pool.py:369(starmap)\n",
      "        1    0.000    0.000   29.782   29.782 pool.py:767(get)\n",
      "        1    0.000    0.000   29.782   29.782 pool.py:764(wait)\n",
      "        1    0.000    0.000    0.045    0.045 context.py:115(Pool)\n",
      "        1    0.000    0.000    0.045    0.045 pool.py:183(__init__)\n",
      "        1    0.000    0.000    0.043    0.043 pool.py:305(_repopulate_pool)\n",
      "        1    0.000    0.000    0.043    0.043 pool.py:314(_repopulate_pool_static)\n",
      "        4    0.000    0.000    0.043    0.011 process.py:110(start)\n",
      "        4    0.000    0.000    0.043    0.011 context.py:285(_Popen)\n",
      "\n",
      "\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "<pstats.Stats at 0x139cbc1f0>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "cProfile.run(\n",
    "    \"pipeline.run(documents=documents, num_workers=4)\",\n",
    "    \"newstats\",\n",
    ")\n",
    "p = pstats.Stats(\"newstats\")\n",
    "p.strip_dirs().sort_stats(SortKey.CUMULATIVE).print_stats(15)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "f482139e-1d0b-41ac-bff0-0c4a86a3ce62",
   "metadata": {},
   "source": [
    "### Async Parallel Execution"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "4b1e3ede-1ff4-430c-abfb-270be055ff71",
   "metadata": {},
   "source": [
    "Here the `ProcessPoolExecutor` from `concurrent.futures` is used to execute processes asynchronously. The tasks are being processed are blocking, but also performed asynchronously on the individual processes."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "3ce7856e-66ee-44ac-94c6-85082d75d327",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "100%|██████████| 5/5 [00:01<00:00,  3.78it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  4.33it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  4.96it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  3.73it/s]\n",
      "100%|██████████| 2/2 [00:00<00:00,  2.26it/s]\n"
     ]
    }
   ],
   "source": [
    "nodes = await pipeline.arun(documents=documents, num_workers=4)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "5c980287-082e-4b0f-b85d-ee3362400eff",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "5297"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "len(nodes)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "a0a0bf6c-510c-44b3-b9f6-570593321817",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "100%|██████████| 5/5 [00:01<00:00,  4.61it/s]\n",
      "100%|██████████| 5/5 [00:00<00:00,  6.02it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  4.78it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  3.78it/s]\n",
      "100%|██████████| 2/2 [00:00<00:00,  2.45it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  4.30it/s]\n",
      "100%|██████████| 5/5 [00:00<00:00,  5.27it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  4.55it/s]\n",
      "100%|██████████| 5/5 [00:02<00:00,  1.92it/s]\n",
      "100%|██████████| 2/2 [00:00<00:00,  2.53it/s]\n",
      "100%|██████████| 5/5 [00:00<00:00,  5.50it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  3.81it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  3.69it/s]\n",
      "100%|██████████| 5/5 [00:02<00:00,  2.26it/s]\n",
      "100%|██████████| 2/2 [00:00<00:00,  2.78it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  3.70it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  4.99it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  4.44it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  3.45it/s]\n",
      "100%|██████████| 2/2 [00:00<00:00,  2.60it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  3.81it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  4.67it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  4.97it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  2.70it/s]\n",
      "100%|██████████| 2/2 [00:00<00:00,  2.52it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  4.20it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  4.31it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  3.84it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  3.06it/s]\n",
      "100%|██████████| 2/2 [00:00<00:00,  2.65it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  4.39it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  4.78it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  3.68it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  4.64it/s]\n",
      "100%|██████████| 2/2 [00:00<00:00,  2.36it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  4.88it/s]\n",
      "100%|██████████| 5/5 [00:00<00:00,  6.65it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  4.55it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  3.25it/s]\n",
      "100%|██████████| 2/2 [00:00<00:00,  3.87it/s]\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "20.3 s ± 6.01 s per loop (mean ± std. dev. of 7 runs, 1 loop each)\n"
     ]
    }
   ],
   "source": [
    "import asyncio\n",
    "\n",
    "loop = asyncio.get_event_loop()\n",
    "%timeit loop.run_until_complete(pipeline.arun(documents=documents, num_workers=4))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "29209f1e-56a2-4d39-b983-59121a6f1009",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "100%|██████████| 5/5 [00:01<00:00,  3.55it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  4.64it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  4.65it/s]\n",
      "100%|██████████| 5/5 [00:01<00:00,  2.83it/s]\n",
      "100%|██████████| 2/2 [00:00<00:00,  3.81it/s]\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Tue Jan  9 15:02:31 2024    async-newstats\n",
      "\n",
      "         2780 function calls in 21.186 seconds\n",
      "\n",
      "   Ordered by: cumulative time\n",
      "   List reduced from 302 to 15 due to restriction <15>\n",
      "\n",
      "   ncalls  tottime  percall  cumtime  percall filename:lineno(function)\n",
      "        1    0.000    0.000   21.186   21.186 {built-in method builtins.exec}\n",
      "        1    0.046    0.046   21.186   21.186 <string>:1(<module>)\n",
      "        1    0.000    0.000   21.140   21.140 nest_asyncio.py:87(run_until_complete)\n",
      "       14    0.000    0.000   21.140    1.510 nest_asyncio.py:101(_run_once)\n",
      "       14    0.000    0.000   20.797    1.486 selectors.py:554(select)\n",
      "       14   20.797    1.486   20.797    1.486 {method 'control' of 'select.kqueue' objects}\n",
      "       27    0.000    0.000    0.343    0.013 events.py:78(_run)\n",
      "       27    0.000    0.000    0.342    0.013 {method 'run' of '_contextvars.Context' objects}\n",
      "        2    0.000    0.000    0.342    0.171 nest_asyncio.py:202(step)\n",
      "        2    0.000    0.000    0.342    0.171 tasks.py:215(__step)\n",
      "        2    0.000    0.000    0.342    0.171 {method 'send' of 'coroutine' objects}\n",
      "        2    0.000    0.000    0.342    0.171 pipeline.py:478(arun)\n",
      "       66    0.245    0.004    0.245    0.004 {method 'acquire' of '_thread.lock' objects}\n",
      "        1    0.000    0.000    0.244    0.244 tasks.py:302(__wakeup)\n",
      "        1    0.000    0.000    0.244    0.244 _base.py:648(__exit__)\n",
      "\n",
      "\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "<pstats.Stats at 0x1037abb80>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "loop = asyncio.get_event_loop()\n",
    "cProfile.run(\n",
    "    \"loop.run_until_complete(pipeline.arun(documents=documents, num_workers=4))\",\n",
    "    \"async-newstats\",\n",
    ")\n",
    "p = pstats.Stats(\"async-newstats\")\n",
    "p.strip_dirs().sort_stats(SortKey.CUMULATIVE).print_stats(15)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "20e345d8-0524-4e1b-8d11-88a2a916196e",
   "metadata": {},
   "source": [
    "### Sequential Execution"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "80091185-d7ac-4ff2-aba4-e1ba5546a865",
   "metadata": {},
   "source": [
    "By default `num_workers` is set to `None` and this will invoke sequential execution."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "9b31aabf-da4d-4a4a-b92c-2b83a75b296a",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00,  4.10it/s]\n"
     ]
    }
   ],
   "source": [
    "nodes = pipeline.run(documents=documents)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "3a116fd0-829e-4138-8461-ee4da5708f61",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "5297"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "len(nodes)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "1ac8b9c1-9129-43e6-9d7d-cd50b3abc953",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:00<00:00,  5.96it/s]\n",
      "100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00,  3.80it/s]\n",
      "100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00,  4.58it/s]\n",
      "100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00,  4.14it/s]\n",
      "100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00,  3.19it/s]\n",
      "100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00,  3.41it/s]\n",
      "100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00,  4.28it/s]\n",
      "100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00,  2.75it/s]\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "1min 11s ± 3.37 s per loop (mean ± std. dev. of 7 runs, 1 loop each)\n"
     ]
    }
   ],
   "source": [
    "%timeit pipeline.run(documents=documents)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "9bf08074-3bb1-46bb-86f0-aca8e103e619",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00,  3.95it/s]\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Tue Jan  9 15:14:23 2024    oldstats\n",
      "\n",
      "         5514413 function calls (5312843 primitive calls) in 74.119 seconds\n",
      "\n",
      "   Ordered by: cumulative time\n",
      "   List reduced from 1253 to 15 due to restriction <15>\n",
      "\n",
      "   ncalls  tottime  percall  cumtime  percall filename:lineno(function)\n",
      "        1    0.000    0.000   74.125   74.125 {built-in method builtins.exec}\n",
      "        1    0.057    0.057   74.125   74.125 <string>:1(<module>)\n",
      "        1    0.000    0.000   74.068   74.068 pipeline.py:378(run)\n",
      "        1    0.000    0.000   74.068   74.068 pipeline.py:53(run_transformations)\n",
      "        1    0.010    0.010   66.055   66.055 base.py:334(__call__)\n",
      "        1    0.007    0.007   65.996   65.996 base.py:234(get_text_embedding_batch)\n",
      "       53    0.000    0.000   65.976    1.245 openai.py:377(_get_text_embeddings)\n",
      "       53    0.000    0.000   65.975    1.245 __init__.py:287(wrapped_f)\n",
      "       53    0.003    0.000   65.975    1.245 __init__.py:369(__call__)\n",
      "       53    0.001    0.000   65.966    1.245 openai.py:145(get_embeddings)\n",
      "       53    0.001    0.000   65.947    1.244 embeddings.py:33(create)\n",
      "       53    0.001    0.000   65.687    1.239 _base_client.py:1074(post)\n",
      "       53    0.000    0.000   65.680    1.239 _base_client.py:844(request)\n",
      "       53    0.002    0.000   65.680    1.239 _base_client.py:861(_request)\n",
      "       53    0.001    0.000   64.171    1.211 _client.py:882(send)\n",
      "\n",
      "\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "<pstats.Stats at 0x154f45060>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "cProfile.run(\"pipeline.run(documents=documents)\", \"oldstats\")\n",
    "p = pstats.Stats(\"oldstats\")\n",
    "p.strip_dirs().sort_stats(SortKey.CUMULATIVE).print_stats(15)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "b404fef3-ea1c-4b38-a558-c5be27bdd9f7",
   "metadata": {},
   "source": [
    "### Async on Main Processor"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "eb7b8e77-199c-4afc-870d-91fafc112f8e",
   "metadata": {},
   "source": [
    "As with the sync case, `num_workers` is default to `None`, which will then lead to single-batch execution of async tasks."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "dca073ac-ed85-4d29-821e-2acf37ea5525",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00,  3.18it/s]\n"
     ]
    }
   ],
   "source": [
    "nodes = await pipeline.arun(documents=documents)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "1173231f-bea7-49b6-895c-25ac4aa352b3",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "5297"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "len(nodes)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "bb37efa7-3936-4cf8-a029-fcba95205218",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00,  4.11it/s]\n",
      "100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00,  4.18it/s]\n",
      "100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00,  4.60it/s]\n",
      "100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00,  3.93it/s]\n",
      "100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00,  4.19it/s]\n",
      "100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00,  4.22it/s]\n",
      "100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00,  4.83it/s]\n",
      "100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00,  3.85it/s]\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "20.5 s ± 7.02 s per loop (mean ± std. dev. of 7 runs, 1 loop each)\n"
     ]
    }
   ],
   "source": [
    "%timeit loop.run_until_complete(pipeline.arun(documents=documents))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "4d54d99c-0ea9-46cb-8b4e-9fd97ef2b7d7",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00,  3.31it/s]\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Tue Jan  9 15:17:38 2024    async-oldstats\n",
      "\n",
      "         6967591 function calls (6754866 primitive calls) in 28.185 seconds\n",
      "\n",
      "   Ordered by: cumulative time\n",
      "   List reduced from 1210 to 15 due to restriction <15>\n",
      "\n",
      "   ncalls  tottime  percall  cumtime  percall filename:lineno(function)\n",
      "        1    0.000    0.000   28.191   28.191 {built-in method builtins.exec}\n",
      "        1    0.000    0.000   28.191   28.191 <string>:1(<module>)\n",
      "        1    0.008    0.008   28.191   28.191 nest_asyncio.py:87(run_until_complete)\n",
      "     5111    0.046    0.000   28.181    0.006 nest_asyncio.py:101(_run_once)\n",
      "     5111    0.031    0.000   18.727    0.004 selectors.py:554(select)\n",
      "     8561   18.696    0.002   18.696    0.002 {method 'control' of 'select.kqueue' objects}\n",
      "     8794    0.010    0.000    9.356    0.001 events.py:78(_run)\n",
      "     8794    0.007    0.000    9.346    0.001 {method 'run' of '_contextvars.Context' objects}\n",
      "     4602    0.007    0.000    9.154    0.002 nest_asyncio.py:202(step)\n",
      "     4602    0.024    0.000    9.147    0.002 tasks.py:215(__step)\n",
      "     4531    0.003    0.000    9.093    0.002 {method 'send' of 'coroutine' objects}\n",
      "       16    0.000    0.000    6.004    0.375 pipeline.py:478(arun)\n",
      "       16    0.000    0.000    6.004    0.375 pipeline.py:88(arun_transformations)\n",
      "        1    0.000    0.000    5.889    5.889 schema.py:130(acall)\n",
      "        1    0.000    0.000    5.889    5.889 interface.py:108(__call__)\n",
      "\n",
      "\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "<pstats.Stats at 0x2b157f310>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "cProfile.run(\n",
    "    \"loop.run_until_complete(pipeline.arun(documents=documents))\",\n",
    "    \"async-oldstats\",\n",
    ")\n",
    "p = pstats.Stats(\"async-oldstats\")\n",
    "p.strip_dirs().sort_stats(SortKey.CUMULATIVE).print_stats(15)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "5c9bfb79-2def-462c-b3bb-d446e3bb9463",
   "metadata": {},
   "source": [
    "### In Summary"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "702e90d6-013a-43d6-8c49-dbd78709587a",
   "metadata": {},
   "source": [
    "The results from the above experiments are re-shared below where each strategy is listed from fastest to slowest with this example dataset and pipeline.\n",
    "\n",
    "1. (Async, Parallel Processing): 20.3s \n",
    "2. (Async, No Parallel Processing): 20.5s\n",
    "3. (Sync, Parallel Processing): 29s\n",
    "4. (Sync, No Parallel Processing): 1min 11s\n",
    "\n",
    "We can see that both cases that use Parallel Processing outperforms the Sync, No Parallel Processing (i.e., `.run(num_workers=None)`). Also, that at least for this case for Async tasks, there is little gains in using Parallel Processing. Perhaps for larger workloads and IngestionPipelines, using Async with Parallel Processing can lead to larger gains."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "llama_index_3.10",
   "language": "python",
   "name": "llama_index_3.10"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
